/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.range;



import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.OffsetBasedSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



/**

 * A {@link RangeTracker} for non-negative positions of type {@code long}.

 *

 * <p>Not to be confused with {@link

 * com.bff.gaia.unified.sdk.transforms.splittabledofn.OffsetRangeTracker}.

 */

public class OffsetRangeTracker implements RangeTracker<Long> {

  private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);



  private long startOffset;

  private long stopOffset;

  private long lastRecordStart = -1L;

  private long offsetOfLastSplitPoint = -1L;

  private long splitPointsSeen = 0L;

  private boolean done = false;



  /**

   * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and

   * indicates reading all of the records until the end without specifying exactly what the end is.

   *

   * <p>Infinite ranges cannot be split because it is impossible to estimate progress within them.

   */

  public static final long OFFSET_INFINITY = Long.MAX_VALUE;



  /** Creates an {@code OffsetRangeTracker} for the specified range. */

  public OffsetRangeTracker(long startOffset, long stopOffset) {

    this.startOffset = startOffset;

    this.stopOffset = stopOffset;

  }



  private OffsetRangeTracker() {}



  public synchronized boolean isStarted() {

    // done => started: handles the case when the reader was empty.

    return (offsetOfLastSplitPoint != -1) || done;

  }



  public synchronized boolean isDone() {

    return done;

  }



  @Override

  public synchronized Long getStartPosition() {

    return startOffset;

  }



  @Override

  public synchronized Long getStopPosition() {

    return stopOffset;

  }



  @Override

  public boolean tryReturnRecordAt(boolean isAtSplitPoint, Long recordStart) {

    return tryReturnRecordAt(isAtSplitPoint, recordStart.longValue());

  }



  public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) {

    if (!isStarted() && !isAtSplitPoint) {

      throw new IllegalStateException(

          String.format("The first record [starting at %d] must be at a split point", recordStart));

    }

    if (recordStart < startOffset) {

      throw new IllegalStateException(

          String.format(

              "Trying to return record [starting at %d] which is before the start offset [%d]",

              recordStart, startOffset));

    }

    if (recordStart < lastRecordStart) {

      throw new IllegalStateException(

          String.format(

              "Trying to return record [starting at %d] "

                  + "which is before the last-returned record [starting at %d]",

              recordStart, lastRecordStart));

    }



    if (lastRecordStart == -1) {

      startOffset = recordStart;

    }

    lastRecordStart = recordStart;



    if (isAtSplitPoint) {

      if (recordStart == offsetOfLastSplitPoint) {

        throw new IllegalStateException(

            String.format(

                "Record at a split point has same offset as the previous split point: "

                    + "previous split point at %d, current record starts at %d",

                offsetOfLastSplitPoint, recordStart));

      }

      if (recordStart >= stopOffset) {

        done = true;

        return false;

      }

      offsetOfLastSplitPoint = recordStart;

      ++splitPointsSeen;

    }



    return true;

  }



  @Override

  public boolean trySplitAtPosition(Long splitOffset) {

    return trySplitAtPosition(splitOffset.longValue());

  }



  public synchronized boolean trySplitAtPosition(long splitOffset) {

    if (stopOffset == OFFSET_INFINITY) {

      LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset);

      return false;

    }

    if (!isStarted()) {

      LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset);

      return false;

    }



    // Note: technically it is correct to split at any position after the last returned

    // split point, not just the last returned record.

    // TODO: Investigate whether in practice this is useful or, rather, confusing.

    if (splitOffset <= lastRecordStart) {

      LOG.debug(

          "Refusing to split {} at {}: already past proposed split position", this, splitOffset);

      return false;

    }

    if (splitOffset < startOffset || splitOffset >= stopOffset) {

      LOG.debug(

          "Refusing to split {} at {}: proposed split position out of range", this, splitOffset);

      return false;

    }

    LOG.debug("Agreeing to split {} at {}", this, splitOffset);

    this.stopOffset = splitOffset;

    return true;

  }



  /**

   * Returns a position {@code P} such that the range {@code [start, P)} represents approximately

   * the given fraction of the range {@code [start, end)}. Assumes that the density of records in

   * the range is approximately uniform.

   */

  public synchronized long getPositionForFractionConsumed(double fraction) {

    if (stopOffset == OFFSET_INFINITY) {

      throw new IllegalArgumentException(

          "getPositionForFractionConsumed is not applicable to an unbounded range: " + this);

    }

    return (long) Math.floor(startOffset + fraction * (stopOffset - startOffset));

  }



  @Override

  public synchronized double getFractionConsumed() {

    if (!isStarted()) {

      return 0.0;

    } else if (isDone()) {

      return 1.0;

    } else if (stopOffset == OFFSET_INFINITY) {

      return 0.0;

    } else if (lastRecordStart >= stopOffset) {

      return 1.0;

    } else {

      // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3 of 3,4,5

      // which is (4 - 3) / (6 - 3) = 33%.

      // Also, clamp to at most 1.0 because the last consumed position can extend past the

      // stop position.

      return Math.min(1.0, 1.0 * (lastRecordStart - startOffset) / (stopOffset - startOffset));

    }

  }



  /**

   * Returns the total number of split points that have been processed.

   *

   * <p>A split point at a particular offset has been seen if there has been a corresponding call to

   * {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true. It has been

   * processed if there has been a <em>subsequent</em> call to {@link #tryReturnRecordAt(boolean,

   * long)} with {@code isAtSplitPoint} true and at a larger offset.

   *

   * <p>Note that for correctness when implementing {@link BoundedSource.BoundedReader#getSplitPointsConsumed()},

   * if a reader finishes before {@link #tryReturnRecordAt(boolean, long)} returns false, the reader

   * should add an additional call to {@link #markDone()}. This will indicate that processing for

   * the last seen split point has been finished.

   *

   * @see OffsetBasedSource for a {@link BoundedSource.BoundedReader} implemented using

   *     {@link OffsetRangeTracker}.

   */

  public synchronized long getSplitPointsProcessed() {

    if (!isStarted()) {

      return 0;

    } else if (isDone()) {

      return splitPointsSeen;

    } else {

      // There is a current split point, and it has not finished processing.

      checkState(

          splitPointsSeen > 0,

          "A started rangeTracker should have seen > 0 split points (is %s)",

          splitPointsSeen);

      return splitPointsSeen - 1;

    }

  }



  /**

   * Marks this range tracker as being done. Specifically, this will mark the current split point,

   * if one exists, as being finished.

   *

   * <p>Always returns false, so that it can be used in an implementation of {@link

   * BoundedSource.BoundedReader#start()} or {@link BoundedSource.BoundedReader#advance()} as follows:

   *

   * <pre>{@code

   * public boolean start() {

   *   return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position)

   *       || rangeTracker.markDone();

   * }

   * }</pre>

   */

  public synchronized boolean markDone() {

    done = true;

    return false;

  }



  @Override

  public synchronized String toString() {

    String stopString = (stopOffset == OFFSET_INFINITY) ? "infinity" : String.valueOf(stopOffset);

    if (lastRecordStart >= 0) {

      return String.format(

          "<at [starting at %d] of offset range [%d, %s)>",

          lastRecordStart, startOffset, stopString);

    } else {

      return String.format("<unstarted in offset range [%d, %s)>", startOffset, stopString);

    }

  }



  /**

   * Returns a copy of this tracker for testing purposes (to simplify testing methods with side

   * effects).

   */

  @VisibleForTesting

  OffsetRangeTracker copy() {

    synchronized (this) {

      OffsetRangeTracker res = new OffsetRangeTracker();

      // This synchronized is not really necessary, because there's no concurrent access to "res",

      // however it is necessary to prevent findbugs from complaining about unsynchronized access.

      synchronized (res) {

        res.startOffset = this.startOffset;

        res.stopOffset = this.stopOffset;

        res.offsetOfLastSplitPoint = this.offsetOfLastSplitPoint;

        res.lastRecordStart = this.lastRecordStart;

        res.done = this.done;

        res.splitPointsSeen = this.splitPointsSeen;

      }

      return res;

    }

  }

}